/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.druid.query.groupby.epinephelinae;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.aggregation.AggregatorFactory;

import javax.annotation.Nullable;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;
import java.util.function.ToIntFunction;

/**
 * Groupers aggregate metrics from rows that they typically get from a ColumnSelectorFactory, under
 * grouping keys that some outside driver is passing in. They can also iterate over the grouped
 * rows after the aggregation is done.
 *
 * They work sort of like a map of KeyType to aggregated values, except they don't support
 * random lookups.
 *
 * See {@link VectorGrouper} for a vectorized version.
 *
 * @param <KeyType> type of the key that will be passed in
 */
public interface Grouper<KeyType> extends Closeable
{
  /**
   * Initialize the grouper.
   * This method needs to be called before calling {@link #aggregate(Object)} and {@link #aggregate(Object, int)}.
   */
  void init();

  /**
   * Check this grouper is initialized or not.
   *
   * @return true if the grouper is already initialized, otherwise false.
   */
  boolean isInitialized();

  /**
   * Aggregate the current row with the provided key. Some implementations are thread-safe and
   * some are not.
   *
   * @param key     key object
   * @param keyHash result of {@link #hashFunction()} on the key
   *
   * @return result that is ok if the row was aggregated, not ok if a resource limit was hit
   */
  AggregateResult aggregate(KeyType key, int keyHash);

  /**
   * Aggregate the current row with the provided key. Some implementations are thread-safe and
   * some are not.
   *
   * @param key key
   *
   * @return result that is ok if the row was aggregated, not ok if a resource limit was hit
   */
  default AggregateResult aggregate(KeyType key)
  {
    Preconditions.checkNotNull(key, "key");
    return aggregate(key, hashFunction().applyAsInt(key));
  }

  /**
   * Reset the grouper to its initial state.
   */
  void reset();

  default ToIntFunction<KeyType> hashFunction()
  {
    return Groupers::hashObject;
  }

  /**
   * Close the grouper and release associated resources.
   */
  @Override
  void close();

  /**
   * Iterate through entries.
   * <p>
   * Some implementations allow writes even after this method is called.  After you are done with the iterator
   * returned by this method, you should either call {@link #close()} (if you are done with the Grouper) or
   * {@link #reset()} (if you want to reuse it).  Some implementations allow calling {@link #iterator(boolean)} again if
   * you want another iterator. But, this method must not be called by multiple threads concurrently.
   * <p>
   * If "sorted" is true then the iterator will return sorted results. It will use KeyType's natural ordering on
   * deserialized objects, and will use the {@link KeySerde#bufferComparator()} on serialized objects. Woe be unto you
   * if these comparators are not equivalent.
   * <p>
   * Callers must process and discard the returned {@link Entry}s immediately because some implementations can reuse the
   * key objects.
   *
   * @param sorted return sorted results
   *
   * @return entry iterator
   */
  CloseableIterator<Entry<KeyType>> iterator(boolean sorted);

  interface Entry<T>
  {
    T getKey();

    Object[] getValues();
  }

  interface KeySerdeFactory<T>
  {
    /**
     * Return max dictionary size threshold.
     *
     * @return max dictionary size
     */
    long getMaxDictionarySize();

    /**
     * Create a new {@link KeySerde}, which may be stateful.
     */
    KeySerde<T> factorize();

    /**
     * Create a new {@link KeySerde} with the given dictionary.
     */
    KeySerde<T> factorizeWithDictionary(List<String> dictionary);

    /**
     * Copies a key. Required if the key from an {@link Entry} from {@link #iterator} will be retained past the
     * following call to next().
     */
    T copyKey(T key);

    /**
     * Return an object that knows how to compare two serialized key instances. Will be called by the
     * {@link #iterator(boolean)} method if sorting is enabled.
     *
     * @param forceDefaultOrder Return a comparator that sorts by the key in default lexicographic ascending order,
     *                          regardless of any other conditions (e.g., presence of OrderBySpecs).
     *
     * @return comparator for key objects.
     */
    Comparator<Grouper.Entry<T>> objectComparator(boolean forceDefaultOrder);
  }

  /**
   * Possibly-stateful object responsible for serde and comparison of keys. Does not need to be thread-safe.
   */
  interface KeySerde<T>
  {
    /**
     * Size of the keys returned by {@link #toByteBuffer(Object)} (which must be a fixed size)
     */
    int keySize();

    /**
     * Class of the keys.
     */
    Class<T> keyClazz();

    /**
     * Return the dictionary of this KeySerde.  The return value should not be null.
     */
    List<String> getDictionary();

    /**
     * Return the estimated size of the dictionary of this KeySerde.
     */
    Long getDictionarySize();

    /**
     * Serialize a key. This will be called by the {@link #aggregate(Object)} method. The buffer will not
     * be retained after the aggregate method returns, so reusing buffers is OK.
     * <p>
     * This method may return null, which indicates that some internal resource limit has been reached and
     * no more keys can be generated. In this situation you can call {@link #reset()} and try again, although
     * beware the caveats on that method.
     *
     * @param key key object
     *
     * @return serialized key, or null if we are unable to serialize more keys due to resource limits
     */
    @Nullable
    ByteBuffer toByteBuffer(T key);

    /**
     * Create a reusable key that can be passed to {@link #readFromByteBuffer}.
     */
    T createKey();

    /**
     * Deserialize a key from a buffer. Will be called by the {@link #iterator(boolean)} method.
     *
     * @param key      object from {@link #createKey()}
     * @param buffer   buffer containing the key
     * @param position key start position in the buffer
     */
    void readFromByteBuffer(T key, ByteBuffer buffer, int position);

    /**
     * Return an object that knows how to compare two serialized keys. Will be called by the
     * {@link #iterator(boolean)} method if sorting is enabled.
     *
     * @return comparator for keys
     */
    BufferComparator bufferComparator();

    /**
     * When pushing down limits, it may also be necessary to compare aggregated values along with the key
     * using the bufferComparator.
     *
     * @param aggregatorFactories Array of aggregators from a GroupByQuery
     * @param aggregatorOffsets   Offsets for each aggregator in aggregatorFactories pointing to their location
     *                            within the grouping key + aggs buffer.
     *
     * @return comparator for keys + aggs
     */
    BufferComparator bufferComparatorWithAggregators(AggregatorFactory[] aggregatorFactories, int[] aggregatorOffsets);

    /**
     * Decorates the object mapper enabling it to read and write query results' grouping keys. It is used by the
     * {@link SpillingGrouper} to preserve the types of the dimensions after serializing and deserializing them on the
     * spilled files.
     */
    default ObjectMapper decorateObjectMapper(ObjectMapper spillMapper)
    {
      return spillMapper;
    }

    /**
     * Reset the keySerde to its initial state. After this method is called, {@link #readFromByteBuffer}
     * and {@link #bufferComparator()} may no longer work properly on previously-serialized keys.
     */
    void reset();
  }

  interface BufferComparator
  {
    int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition);
  }
}
